余烬缀记

因为 SSE 导致 Axum 无法优雅关闭的处理办法

image_20240603171214.png

每次使用 Docker 中止写的代码都会超过宽限时间被强行中止,这并不是一件好事,因此需要添加处理关闭的逻辑。在使用时发现 sse 导致 axum 无法退出,因此记录一下

当使用命令 docker stop xxx 时,Docker 会向容器里应用的进程发送 SIGTERM 信号,因此需要监听该信号。

这里使用 tokio_util::sync::CancellationToken 来传递即将停止的消息,在接收到 SIGTERM 信号后就调用 CancellationTokencancel 方法

# 创建 axum 服务

let shutdown_signal = CancellationToken::new();

let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;

tokio::spawn(async move {
    sigterm.recv().await;
    shutdown_signal.cancel();
})

shutdown_signal 需要储存在 axumstate 中,axumwith_graceful_shutdown 也需要使用到它

tokio::spawn(async move{  
    let (tx, _) = tokio::sync::broadcast::channel(8);
    let state = AppState{
        shutdown_signal: shutdown_signal.clone(),
        broadcast: tx // sse 的广播通道也记录于此
    }let routes = /**/
    axum::serve(
        listener,
        routes.with_state(state)
    )
    .with_graceful_shutdown(async move {  
        shutdown_signal.cancelled().await;  
    })  
    .await
})

# SSE 处理

存在 sse 会导致 axum 无法关闭,具体可查看 Graceful shutdown never stops the server when there is an open sse connection,因此需要监听 shutdown_signal。前面将 shutdown_signal 记录在了 axumstate 中,读取还是很方便。

use axum::{  
    extract::State,
    response::{sse, Sse},  
    BoxError,  
};  
use serde_json::json;
use tokio_stream::wrappers::BroadcastStream;
use futures::stream;
use tokio_stream::StreamExt;

pub async sse(
    State(state): State<AppState>
) -> Sse<impl tokio_stream::Stream<Item = Result<sse::Event, BoxError>>>{
    // 这是 SSE 的事件广播
    let receiver = state.broadcast.subscribe();
    // 将广播转化为 stream
    let notify_stream = BroadcastStream::new(receiver).map(|it| -> Result<sse::Event, BoxError> {  
            match it {  
                Ok(payload) => Ok(sse::Event::default().data(payload.to_json())),  
                Err(err) => {  
                    tracing::error!(reason = ?err, "failed to read broadcast message.");  
                    Err(Box::new(err))  
                }  
            }  
        },  
    );
    // 创建一个心跳 stream
    let heart_stream = stream::repeat_with(|| {  
        let now = SystemTime::now()  
            .duration_since(std::time::UNIX_EPOCH)  
            .unwrap_or_default();  
        sse::Event::default().data(  
            json!({  
                "type": "HEART",  
                "time": now.as_millis()  
            })  
            .to_string(),  
        )  
    })  
    .map(|it| -> Result<sse::Event, BoxError> { Ok(it) })
    .throttle(Duration::from_secs(1));
    
    // 组合两个 stream
    let combined_stream = stream::select(notify_stream, heart_stream);
    
    // 将其包装成可中止的 stream
    let (combined_stream, handler) = stream::abortable(combined_stream);
    
    // 当 shutdown_signal 取消时,中止 stream
    tokio::spawn(async move {  
        shutdown_signal.cancelled().await;  
        handler.abort()  
    });
    
    // 返回
    Sse::new(combined_stream).keep_alive(  
        sse::KeepAlive::new()  
            .interval(Duration::from_secs(1))  
            .text("keep"),  
    )
}

sse 关闭后,axum 便能正常停止了